#include <thread>
#include "zmq_messenger.h"
#include "app_common.h"
#include "loghelper.h"
#include "bytes_buffer.hpp"
#include "messagebus.h"

using namespace std;
using namespace RockLog;

ZMQMessenger::ZMQMessenger(const std::string &workerId)
    : m_context(1), m_publisher(m_context, ZMQ_PUB), m_workerId(workerId)
{
    if (m_workerId.length() != kWorkIdLength)
    {
        LOG(kErr) << "invalid workerId length!";
        return;
    }
}

void ZMQMessenger::init()
{
    std::string url = "ipc://" + m_workerId + ".ipc";
    m_publisher.bind(url); // Not usable on Windows.
    LOG(kInfo) << "bind url" << url;
    std::thread([this]()
                { this->subcribeMsg(); })
        .detach();
}

void ZMQMessenger::subcribeMsg()
{
    zmq::context_t context(1);
    zmq::socket_t subscriber(context, ZMQ_SUB);
    subscriber.connect(kZMQServerURL);
    subscriber.setsockopt(ZMQ_SUBSCRIBE, m_workerId.c_str(), m_workerId.length());
    LOG(kInfo) << "subcribe url:" << kZMQServerURL;

    while (true)
    {
        zmq::message_t msg;
        subscriber.recv(&msg);
        g_messagebus.sendMessage(kZmqRecvMSG, (const char*)msg.data(), (uint32_t)msg.size());
    }
}

void ZMQMessenger::publishMsg(const std::string &msg)
{
    zmq::message_t zmq_msg(msg.size() + kWorkIdLength);
    memcpy((char *)zmq_msg.data(), m_workerId.c_str(), kWorkIdLength);
    memcpy((char *)zmq_msg.data() + kWorkIdLength, msg.c_str(), msg.size());
    m_publisher.send(zmq_msg);
}

void ZMQMessenger::publishMsg(const char *msg, uint32_t len)
{
    zmq::message_t zmq_msg(len + kWorkIdLength);
    memcpy((char *)zmq_msg.data(), m_workerId.c_str(), kWorkIdLength);
    memcpy((char *)zmq_msg.data() + kWorkIdLength, msg, len);
    m_publisher.send(zmq_msg);
}